# 1. Define a function to calculate indicators for a single dataframe
def calculate_indicators(data):
# Ensure data is sorted by date
data = data.sort_values('date').reset_index(drop=True)
# Trend Indicators
data.ta.sma(length=50, append=True) # Simple Moving Average (50)
data.ta.sma(length=200, append=True) # Simple Moving Average (200)
data.ta.ema(length=20, append=True) # Exponential Moving Average (20)
# Momentum Indicators
data.ta.rsi(length=14, append=True) # Relative Strength Index (14)
# Volatility Indicators - Bollinger Bands
# Using bbands with standard parameters
bbands = data.ta.bbands(length=20, std=2, append=True)
# Trend Following (MACD)
data.ta.macd(fast=12, slow=26, signal=9, append=True)
# Money Flow Index (MFI)
data.ta.mfi(length=14, append=True)
# Volume
data['Volume'] = data['volume']
return data
# 2. Apply calculation to all tickers
# Create a list to store processed dataframes
processed_frames = []
# Get list of unique tickers
tickers = df['ticker'].unique()
for ticker in tickers:
# Filter data for specific ticker
ticker_df = df[df['ticker'] == ticker].copy()
# Calculate indicators (need at least 200 days for MA200)
if len(ticker_df) > 200:
try:
ticker_df = calculate_indicators(ticker_df)
processed_frames.append(ticker_df)
print(f"✓ Processed {ticker}: {len(ticker_df)} rows, columns: {len(ticker_df.columns)}")
except Exception as e:
print(f"✗ Error processing {ticker}: {e}")
else:
print(f"✗ Skipped {ticker}: Not enough data ({len(ticker_df)} rows, need at least 200)")
# Combine back into a single main DataFrame
if processed_frames:
df_ta = pd.concat(processed_frames, ignore_index=True)
print(f"\nCombined DataFrame shape: {df_ta.shape}")
print(f"Available columns: {df_ta.columns.tolist()}")
else:
print("No data was processed!")
df_ta = pd.DataFrame()
# 3. Check column names and find Bollinger Bands columns
if not df_ta.empty:
print("\nSearching for Bollinger Bands columns...")
bollinger_cols = [col for col in df_ta.columns if 'BB' in col or 'bb' in col]
print(f"Bollinger Bands columns found: {bollinger_cols}")
# Find the actual column names for BB upper and lower
bb_upper = None
bb_lower = None
bb_middle = None
for col in bollinger_cols:
col_lower = col.lower()
if 'bbu' in col_lower or 'upper' in col_lower:
bb_upper = col
elif 'bbl' in col_lower or 'lower' in col_lower:
bb_lower = col
elif 'bbm' in col_lower or 'middle' in col_lower:
bb_middle = col
print(f"Upper BB column: {bb_upper}")
print(f"Lower BB column: {bb_lower}")
print(f"Middle BB column: {bb_middle}")
# 4. Calculate scores for each indicator with safe column access
def calculate_scores(row):
scores = {}
# Helper function to safely get values
def get_value(col_name, default=np.nan):
if col_name in row and pd.notna(row[col_name]):
return row[col_name]
return default
# MA50 vs Close
ma50 = get_value('SMA_50')
if pd.notna(ma50):
if row['close'] > ma50:
scores['MA50'] = 1
elif row['close'] < ma50:
scores['MA50'] = -1
else:
scores['MA50'] = 0
# MA200 vs Close
ma200 = get_value('SMA_200')
if pd.notna(ma200):
if row['close'] > ma200:
scores['MA200'] = 1
elif row['close'] < ma200:
scores['MA200'] = -1
else:
scores['MA200'] = 0
# EMA20 vs Close
ema20 = get_value('EMA_20')
if pd.notna(ema20):
if row['close'] > ema20:
scores['EMA'] = 1
elif row['close'] < ema20:
scores['EMA'] = -1
else:
scores['EMA'] = 0
# MACD
macd = get_value('MACD_12_26_9') or get_value('MACD')
if pd.notna(macd):
if macd > 0:
scores['MACD'] = 1
elif macd < 0:
scores['MACD'] = -1
else:
scores['MACD'] = 0
# RSI
rsi = get_value('RSI_14')
if pd.notna(rsi):
if rsi > 70:
scores['RSI'] = -1 # Overbought
elif rsi < 30:
scores['RSI'] = 1 # Oversold
else:
scores['RSI'] = 0 # Neutral
# Bollinger Bands
if bb_upper and bb_lower:
bb_upper_val = get_value(bb_upper)
bb_lower_val = get_value(bb_lower)
if pd.notna(bb_upper_val) and pd.notna(bb_lower_val):
if row['close'] > bb_upper_val:
scores['BB'] = -1 # Overbought (above upper band)
elif row['close'] < bb_lower_val:
scores['BB'] = 1 # Oversold (below lower band)
else:
scores['BB'] = 0 # Within bands
# MFI (Money Flow Index)
mfi = get_value('MFI_14')
if pd.notna(mfi):
if mfi > 80:
scores['MFI'] = -1 # Overbought
elif mfi < 20:
scores['MFI'] = 1 # Oversold
else:
scores['MFI'] = 0 # Neutral
# Calculate total score
total_score = sum(scores.values()) if scores else 0
# Determine signal
if total_score > 0:
signal = 'Positive'
elif total_score < 0:
signal = 'Negative'
else:
signal = 'Neutral'
# Add scores and signal to row
row['MA50_Score'] = scores.get('MA50', 0)
row['MA200_Score'] = scores.get('MA200', 0)
row['EMA_Score'] = scores.get('EMA', 0)
row['MACD_Score'] = scores.get('MACD', 0)
row['RSI_Score'] = scores.get('RSI', 0)
row['BB_Score'] = scores.get('BB', 0)
row['MFI_Score'] = scores.get('MFI', 0)
row['Total_Score'] = total_score
row['Signal'] = signal
return row
# Apply scoring function if we have data
if not df_ta.empty:
print("\nCalculating scores for each row...")
df_ta = df_ta.apply(calculate_scores, axis=1)
# 5. Create a summary report table (latest data for each ticker)
# Get the latest date for each ticker
latest_dates = df_ta.groupby('ticker')['date'].max()
# Create summary dataframe
summary_rows = []
for ticker, latest_date in latest_dates.items():
ticker_data = df_ta[(df_ta['ticker'] == ticker) & (df_ta['date'] == latest_date)]
if not ticker_data.empty:
latest_row = ticker_data.iloc[0]
summary_rows.append({
'Ticker': ticker,
'Close Price': latest_row['close'],
'Volume': latest_row['volume'],
'MA50': latest_row.get('SMA_50', np.nan),
'MA200': latest_row.get('SMA_200', np.nan),
'EMA20': latest_row.get('EMA_20', np.nan),
'MACD': latest_row.get('MACD_12_26_9', np.nan),
'RSI': latest_row.get('RSI_14', np.nan),
'BB Upper': latest_row.get(bb_upper, np.nan) if bb_upper else np.nan,
'BB Lower': latest_row.get(bb_lower, np.nan) if bb_lower else np.nan,
'MFI': latest_row.get('MFI_14', np.nan),
'MA50 Score': latest_row['MA50_Score'],
'MA200 Score': latest_row['MA200_Score'],
'EMA Score': latest_row['EMA_Score'],
'MACD Score': latest_row['MACD_Score'],
'RSI Score': latest_row['RSI_Score'],
'BB Score': latest_row['BB_Score'],
'MFI Score': latest_row['MFI_Score'],
'Total Score': latest_row['Total_Score'],
'Signal': latest_row['Signal']
})
# Create summary dataframe
summary_df = pd.DataFrame(summary_rows)
# 6. Display the summary table with styling
def style_scores(val):
if isinstance(val, (int, float)):
if val > 0:
return 'color: green; font-weight: bold;'
elif val < 0:
return 'color: red; font-weight: bold;'
else:
return 'color: black;'
return ''
def style_signal(val):
if val == 'Positive':
return 'background-color: #d4edda; color: #155724; font-weight: bold;'
elif val == 'Negative':
return 'background-color: #f8d7da; color: #721c24; font-weight: bold;'
else:
return 'background-color: #fff3cd; color: #856404; font-weight: bold;'
# Apply styling - using map() instead of applymap() for newer pandas versions
score_columns = ['MA50 Score', 'MA200 Score', 'EMA Score', 'MACD Score',
'RSI Score', 'BB Score', 'MFI Score', 'Total Score']
# Filter columns that actually exist in the dataframe
existing_score_cols = [col for col in score_columns if col in summary_df.columns]
# Create styler object
styler = summary_df.style
# Apply formatting
styler = styler.format({
'Close Price': '{:.2f}',
'MA50': '{:.2f}',
'MA200': '{:.2f}',
'EMA20': '{:.2f}',
'MACD': '{:.4f}',
'RSI': '{:.1f}',
'BB Upper': '{:.2f}',
'BB Lower': '{:.2f}',
'MFI': '{:.1f}',
'Volume': '{:,.0f}'
})
# Apply styling to score columns (use map for newer pandas)
if existing_score_cols:
styler = styler.map(style_scores, subset=existing_score_cols)
# Apply styling to signal column
if 'Signal' in summary_df.columns:
styler = styler.map(style_signal, subset=['Signal'])
# Set properties and table styles
styler = styler.set_properties(**{
'text-align': 'center',
'border': '1px solid #ddd',
'padding': '5px'
})
styler = styler.set_caption("Technical Analysis Summary - Latest Signals")